/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.carbondata.datasource

import java.io.File

import org.apache.commons.codec.binary.{Base64, Hex}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.util.BinaryUtil

class SparkCarbonDataSourceBinaryTest extends QueryTest with BeforeAndAfterAll {

  var writerPath = s"$target/SparkCarbonFileFormat/WriterOutput/"
  var outputPath = writerPath + 2
  // getCanonicalPath gives path with \, but the code expects /.
  writerPath = writerPath.replace("\\", "/")

  var sdkPath = new File(this.getClass.getResource("/").getPath + "../../../../sdk/sdk/")
    .getCanonicalPath

  def buildTestBinaryData(): Any = {
    FileUtils.deleteDirectory(new File(writerPath))
    FileUtils.deleteDirectory(new File(outputPath))

    val sourceImageFolder = sdkPath + "/src/test/resources/image/flowers"
    val sufAnnotation = ".txt"
    BinaryUtil.binaryToCarbon(sourceImageFolder, writerPath, sufAnnotation, ".jpg")
  }

  private def cleanTestData() = {
    FileUtils.deleteDirectory(new File(writerPath))
    FileUtils.deleteDirectory(new File(outputPath))
  }

  override def beforeAll(): Unit = {
    defaultConfig()
    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    buildTestBinaryData()

    FileUtils.deleteDirectory(new File(outputPath))
    sql("DROP TABLE IF EXISTS sdkOutputTable")
  }

  override def afterAll(): Unit = {
    cleanTestData()
    sql("DROP TABLE IF EXISTS sdkOutputTable")
  }

  test("Test direct sql read carbon") {
    assert(new File(writerPath).exists())
    checkAnswer(
      sql(s"SELECT COUNT(*) FROM carbon.`$writerPath`"),
      Seq(Row(3)))
  }

  test("Test read image carbon with spark carbon file format, generate by sdk, CTAS") {
    sql("DROP TABLE IF EXISTS binaryCarbon")
    sql("DROP TABLE IF EXISTS binaryCarbon3")
    FileUtils.deleteDirectory(new File(outputPath))
    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
      sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')")
      sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" +
          " AS SELECT * FROM binaryCarbon")
    } else {
      sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
      sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" +
          " AS SELECT * FROM binaryCarbon")
    }
    checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
      Seq(Row(3)))
    checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
      Seq(Row(3)))
    sql("DROP TABLE IF EXISTS binaryCarbon")
    sql("DROP TABLE IF EXISTS binaryCarbon3")
    FileUtils.deleteDirectory(new File(outputPath))
  }

  test("Don't support sort_columns") {
    sql("DROP TABLE IF EXISTS binaryTable")
    var exception = intercept[Exception] {
      sql(
        s"""
           | CREATE TABLE binaryTable (
           |    id DOUBLE,
           |    label BOOLEAN,
           |    name STRING,
           |    image BINARY,
           |    autoLabel BOOLEAN)
           | using carbon
           | options('SORT_COLUMNS'='image')
            """.stripMargin)
      // TODO: it should throw exception when create table
      sql("SELECT COUNT(*) FROM binaryTable").collect()
    }
    assert(exception.getCause.getMessage
      .contains(
        "sort columns not supported for array, struct, map, double, float, decimal, varchar, " +
        "binary"))

    sql("DROP TABLE IF EXISTS binaryTable")
    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
      exception = intercept[Exception] {
        sql(
          s"""
             | CREATE TABLE binaryTable
             | using carbon
             | options(PATH '$writerPath',
             |  'SORT_COLUMNS'='image')
                """.stripMargin)
        sql("SELECT COUNT(*) FROM binaryTable").collect()
      }
    } else {
      exception = intercept[Exception] {
        sql(
          s"""
             | CREATE TABLE binaryTable
             | using carbon
             | options('SORT_COLUMNS'='image')
             | LOCATION '$writerPath'
                """.stripMargin)
        sql("SELECT COUNT(*) FROM binaryTable").collect()
      }
    }
    assert(exception.getMessage.contains("Cannot use sort columns during infer schema"))


    sql("DROP TABLE IF EXISTS binaryTable")
    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
      exception = intercept[Exception] {
        sql(
          s"""
             | CREATE TABLE binaryTable (
             |    id DOUBLE,
             |    label BOOLEAN,
             |    name STRING,
             |    image BINARY,
             |    autoLabel BOOLEAN)
             | using carbon
             | options(PATH '$writerPath',
             | 'SORT_COLUMNS'='image')
                 """.stripMargin)
        sql("SELECT COUNT(*) FROM binaryTable").collect()
      }
    } else {
      exception = intercept[Exception] {
        sql(
          s"""
             | CREATE TABLE binaryTable (
             |    id DOUBLE,
             |    label BOOLEAN,
             |    name STRING,
             |    image BINARY,
             |    autoLabel BOOLEAN)
             | using carbon
             | options('SORT_COLUMNS'='image')
             | LOCATION '$writerPath'
                 """.stripMargin)
        sql("SELECT COUNT(*) FROM binaryTable").collect()
      }
    }
    assert(exception.getCause.getMessage
      .contains(
        "sort columns not supported for array, struct, map, double, float, decimal, varchar, " +
        "binary"))
  }

  test("Don't support long_string_columns for binary") {
    sql("DROP TABLE IF EXISTS binaryTable")
    val exception = intercept[Exception] {
      sql(
        s"""
           | CREATE TABLE binaryTable (
           |    id DOUBLE,
           |    label BOOLEAN,
           |    name STRING,
           |    image BINARY,
           |    autoLabel BOOLEAN)
           | using carbon
           | options('long_string_columns'='image')
       """.stripMargin)
      sql("SELECT COUNT(*) FROM binaryTable").collect()
    }
    assert(exception.getCause.getMessage
      .contains("long string column : image is not supported for data type: BINARY"))
  }

  test("Don't support insert into partition table") {
    if (SparkUtil.isSparkVersionXAndAbove("2.2")) {
      sql("DROP TABLE IF EXISTS binaryCarbon")
      sql("DROP TABLE IF EXISTS binaryCarbon2")
      sql("DROP TABLE IF EXISTS binaryCarbon3")
      sql("DROP TABLE IF EXISTS binaryCarbon4")
      sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
      sql(
        s"""
           | CREATE TABLE binaryCarbon2(
           |    binaryId INT,
           |    binaryName STRING,
           |    binary BINARY,
           |    labelName STRING,
           |    labelContent STRING
           |) USING CARBON""".stripMargin)
      sql(
        s"""
           | CREATE TABLE binaryCarbon3(
           |    binaryId INT,
           |    binaryName STRING,
           |    binary BINARY,
           |    labelName STRING,
           |    labelContent STRING
           |) USING CARBON partitioned by (binary) """.stripMargin)
      sql(
        "select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where " +
        "binaryId=0")
        .collect()

      sql(
        "insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from " +
        "binaryCarbon where binaryId=0 ")
      val carbonResult2 = sql("SELECT * FROM binaryCarbon2")

      sql(
        "create table binaryCarbon4 using carbon select binaryId,binaryName,binary,labelName," +
        "labelContent from binaryCarbon where binaryId=0 ")
      val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
      val carbonResult = sql("SELECT * FROM binaryCarbon")

      assert(3 == carbonResult.collect().length)
      assert(1 == carbonResult4.collect().length)
      assert(1 == carbonResult2.collect().length)
      checkAnswer(carbonResult4, carbonResult2)

      try {
        sql(
          "insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent " +
          "from binaryCarbon where binaryId=0 ")
        assert(false)
      } catch {
        case e: Exception =>
          e.printStackTrace()
          assert(true)
      }
      sql("DROP TABLE IF EXISTS binaryCarbon")
      sql("DROP TABLE IF EXISTS binaryCarbon2")
      sql("DROP TABLE IF EXISTS binaryCarbon3")
      sql("DROP TABLE IF EXISTS binaryCarbon4")
    }
  }

  test("Test unsafe as false") {
    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
    FileUtils.deleteDirectory(new File(outputPath))
    sql("DROP TABLE IF EXISTS binaryCarbon")
    sql("DROP TABLE IF EXISTS binaryCarbon3")
    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
      sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')")
      sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" +
          " AS SELECT * FROM binaryCarbon")
    } else {
      sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
      sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" +
          " AS SELECT * FROM binaryCarbon")
    }
    checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
      Seq(Row(3)))
    checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
      Seq(Row(3)))
    sql("DROP TABLE IF EXISTS binaryCarbon")
    sql("DROP TABLE IF EXISTS binaryCarbon3")

    FileUtils.deleteDirectory(new File(outputPath))
    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
        CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
  }

  test("insert into for hive and carbon, CTAS") {
    sql("DROP TABLE IF EXISTS hiveTable")
    sql("DROP TABLE IF EXISTS carbon_table")
    sql("DROP TABLE IF EXISTS hiveTable2")
    sql("DROP TABLE IF EXISTS carbon_table2")
    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS hivetable (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | row format delimited fields terminated by ','
             """.stripMargin)
    sql("insert into hivetable values(1,true,'Bob','binary',false)")
    sql("insert into hivetable values(2,false,'Xu','test',true)")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS carbon_table (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | using carbon
             """.stripMargin)
    sql("insert into carbon_table values(1,true,'Bob','binary',false)")
    sql("insert into carbon_table values(2,false,'Xu','test',true)")

    val hexHiveResult = sql("SELECT hex(image) FROM hivetable")
    val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table")
    checkAnswer(hexHiveResult, hexCarbonResult)
    hexCarbonResult.collect().foreach { each =>
      val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
      assert("binary".equals(result)
             || "test".equals(result))
    }

    val base64HiveResult = sql("SELECT base64(image) FROM hivetable")
    val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table")
    checkAnswer(base64HiveResult, base64CarbonResult)
    base64CarbonResult.collect().foreach { each =>
      val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
      assert("binary".equals(result)
             || "test".equals(result))
    }

    val carbonResult = sql("SELECT * FROM carbon_table")
    val hiveResult = sql("SELECT * FROM hivetable")

    assert(2 == carbonResult.collect().length)
    assert(2 == hiveResult.collect().length)
    checkAnswer(hiveResult, carbonResult)
    carbonResult.collect().foreach { each =>
      if (1 == each.get(0)) {
        assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
      } else if (2 == each.get(0)) {
        assert("test".equals(new String(each.getAs[Array[Byte]](3))))
      } else {
        assert(false)
      }
    }

    sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
    sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM hivetable")
    val carbonResult2 = sql("SELECT * FROM carbon_table2")
    val hiveResult2 = sql("SELECT * FROM hivetable2")
    checkAnswer(hiveResult2, carbonResult2)
    checkAnswer(carbonResult, carbonResult2)
    checkAnswer(hiveResult, hiveResult2)
    assert(2 == carbonResult2.collect().length)
    assert(2 == hiveResult2.collect().length)

    sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
    sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
    val carbonResult3 = sql("SELECT * FROM carbon_table2")
    val hiveResult3 = sql("SELECT * FROM hivetable2")
    checkAnswer(carbonResult3, hiveResult3)
    assert(4 == carbonResult3.collect().length)
    assert(4 == hiveResult3.collect().length)
  }

  test("insert into for parquet and carbon, CTAS") {
    sql("DROP TABLE IF EXISTS parquetTable")
    sql("DROP TABLE IF EXISTS carbon_table")
    sql("DROP TABLE IF EXISTS parquetTable2")
    sql("DROP TABLE IF EXISTS carbon_table2")
    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS parquettable (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | using parquet
             """.stripMargin)
    sql("insert into parquettable values(1,true,'Bob','binary',false)")
    sql("insert into parquettable values(2,false,'Xu','test',true)")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS carbon_table (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | using carbon
             """.stripMargin)
    sql("insert into carbon_table values(1,true,'Bob','binary',false)")
    sql("insert into carbon_table values(2,false,'Xu','test',true)")
    val carbonResult = sql("SELECT * FROM carbon_table")
    val parquetResult = sql("SELECT * FROM parquettable")

    assert(2 == carbonResult.collect().length)
    assert(2 == parquetResult.collect().length)
    checkAnswer(parquetResult, carbonResult)
    carbonResult.collect().foreach { each =>
      if (1 == each.get(0)) {
        assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
      } else if (2 == each.get(0)) {
        assert("test".equals(new String(each.getAs[Array[Byte]](3))))
      } else {
        assert(false)
      }
    }

    sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table")
    sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM parquettable")
    val carbonResult2 = sql("SELECT * FROM carbon_table2")
    val parquetResult2 = sql("SELECT * FROM parquettable2")
    checkAnswer(parquetResult2, carbonResult2)
    checkAnswer(carbonResult, carbonResult2)
    checkAnswer(parquetResult, parquetResult2)
    assert(2 == carbonResult2.collect().length)
    assert(2 == parquetResult2.collect().length)

    sql("INSERT INTO parquettable2 SELECT * FROM carbon_table")
    sql("INSERT INTO carbon_table2 SELECT * FROM parquettable")
    val carbonResult3 = sql("SELECT * FROM carbon_table2")
    val parquetResult3 = sql("SELECT * FROM parquettable2")
    checkAnswer(carbonResult3, parquetResult3)
    assert(4 == carbonResult3.collect().length)
    assert(4 == parquetResult3.collect().length)
  }

  test("insert into carbon as select from hive after hive load data") {
    sql("DROP TABLE IF EXISTS hiveTable")
    sql("DROP TABLE IF EXISTS carbon_table")
    sql("DROP TABLE IF EXISTS hiveTable2")
    sql("DROP TABLE IF EXISTS carbon_table2")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS hivetable (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | row format delimited fields terminated by '|'
             """.stripMargin)
    sql(
      s"""
         | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv'
         | INTO TABLE hivetable
             """.stripMargin)

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS carbon_table (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | using carbon
             """.stripMargin)
    sql("insert into carbon_table select * from hivetable")

    sqlContext.udf.register("decodeHex", (str: String) =>
      Hex.decodeHex(str.toList.map(_.toInt.toBinaryString).mkString.toCharArray))
    sqlContext.udf.register("unHexValue", (str: String) =>
      org.apache.spark.sql.catalyst.expressions.Hex
        .unhex(str.toList.map(_.toInt.toBinaryString).mkString.getBytes))
    sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))

    val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table")
    val unHexResult = sql("SELECT unHexValue(image) FROM carbon_table")
    checkAnswer(udfHexResult, unHexResult)

    val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table")
    val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table")
    checkAnswer(udfBase64Result, unbase64Result)

    val carbonResult = sql("SELECT * FROM carbon_table")
    val hiveResult = sql("SELECT * FROM hivetable")

    assert(3 == carbonResult.collect().length)
    assert(3 == hiveResult.collect().length)
    checkAnswer(hiveResult, carbonResult)
    carbonResult.collect().foreach { each =>
      if (2 == each.get(0)) {
        assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3))))
      } else if (1 == each.get(0)) {
        assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))))
      } else if (3 == each.get(0)) {
        assert("".equals(new String(each.getAs[Array[Byte]](3)))
               || "\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))))
      } else {
        assert(false)
      }
    }

    sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
    sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM hivetable")
    val carbonResult2 = sql("SELECT * FROM carbon_table2")
    val hiveResult2 = sql("SELECT * FROM hivetable2")
    checkAnswer(hiveResult2, carbonResult2)
    checkAnswer(carbonResult, carbonResult2)
    checkAnswer(hiveResult, hiveResult2)
    assert(3 == carbonResult2.collect().length)
    assert(3 == hiveResult2.collect().length)

    sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
    sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
    val carbonResult3 = sql("SELECT * FROM carbon_table2")
    val hiveResult3 = sql("SELECT * FROM hivetable2")
    checkAnswer(carbonResult3, hiveResult3)
    assert(6 == carbonResult3.collect().length)
    assert(6 == hiveResult3.collect().length)

  }

  test("filter for hive and carbon") {
    sql("DROP TABLE IF EXISTS hiveTable")
    sql("DROP TABLE IF EXISTS carbon_table")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS hivetable (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | row format delimited fields terminated by ','
             """.stripMargin)
    sql("insert into hivetable values(1,true,'Bob','binary',false)")
    sql("insert into hivetable values(2,false,'Xu','test',true)")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS carbon_table (
         |    id int,
         |    label boolean,
         |    name string,
         |    image binary,
         |    autoLabel boolean)
         | using carbon
             """.stripMargin)
    sql("insert into carbon_table values(1,true,'Bob','binary',false)")
    sql("insert into carbon_table values(2,false,'Xu','test',true)")

    // filter with equal
    val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
    val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)")

    checkAnswer(hiveResult, carbonResult)
    assert(1 == carbonResult.collect().length)
    carbonResult.collect().foreach { each =>
      assert(1 == each.get(0))
      assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
    }

    // filter with non string
    val exception = intercept[Exception] {
      sql("SELECT * FROM carbon_table where image=binary").collect()
    }
    assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))

    // filter with not equal
    val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
    val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)")
    checkAnswer(hiveResult3, carbonResult3)
    assert(1 == carbonResult3.collect().length)
    carbonResult3.collect().foreach { each =>
      assert(2 == each.get(0))
      assert("test".equals(new String(each.getAs[Array[Byte]](3))))
    }

    // filter with in
    val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))")
    val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))")
    checkAnswer(hiveResult4, carbonResult4)
    assert(1 == carbonResult4.collect().length)
    carbonResult4.collect().foreach { each =>
      assert(1 == each.get(0))
      assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
    }

    // filter with not in
    val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))")
    val carbonResult5 = sql(
      "SELECT * FROM carbon_table where image not in (cast('binary' as binary))")
    checkAnswer(hiveResult5, carbonResult5)
    assert(1 == carbonResult5.collect().length)
    carbonResult5.collect().foreach { each =>
      assert(2 == each.get(0))
      assert("test".equals(new String(each.getAs[Array[Byte]](3))))
    }
  }

  test("Spark DataSource don't support update, delete") {
    sql("DROP TABLE IF EXISTS carbon_table")
    sql("DROP TABLE IF EXISTS carbon_table2")

    sql(
      s"""
         | CREATE TABLE IF NOT EXISTS carbon_table (
         |    id int,
         |    label boolean,
         |    name string,
         |    binaryField binary,
         |    autoLabel boolean)
         | using carbon
             """.stripMargin)
    sql("insert into carbon_table values(1,true,'Bob','binary',false)")
    sql("insert into carbon_table values(2,false,'Xu','test',true)")

    val carbonResult = sql("SELECT * FROM carbon_table")

    carbonResult.collect().foreach { each =>
      if (1 == each.get(0)) {
        assert("binary".equals(new String(each.getAs[Array[Byte]](3))))
      } else if (2 == each.get(0)) {
        assert("test".equals(new String(each.getAs[Array[Byte]](3))))
      } else {
        assert(false)
      }
    }

    var exception = intercept[Exception] {
      sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").collect()
    }
    assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting")
    || exception.getMessage.contains("UPDATE TABLE is not supported temporarily."))

    exception = intercept[Exception] {
      sql("DELETE FROM carbon_table WHERE id = 1").collect()
    }
    assert(exception.getMessage.contains("only CarbonData table support delete operation")
    || exception.getMessage.contains("DELETE TABLE is not supported temporarily."))
  }

  test("test load on parquet table") {
    sql("drop table if exists parquet_table")
    sql("create table parquet_table(empno int, empname string, projdate Date) using parquet")
    var ex = intercept[AnalysisException] {
      sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE parquet_table
          """.stripMargin)
    }
    assert(ex.getMessage
      .contains("LOAD DATA is not supported for datasource tables: `default`.`parquet_table`"))
    ex = intercept[AnalysisException] {
      sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE parquet_table
           |OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
    }
    assert(ex.getMessage.contains("mismatched input"))
    sql("drop table if exists parquet_table")
  }

  test("test array of binary data type with sparkfileformat ") {
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
    sql("create table if not exists carbon_table(id int, label boolean, name string," +
        "binaryField array<binary>, autoLabel boolean) using carbon")
    sql("insert into carbon_table values(1,true,'abc',array('binary'),false)")
    sql("create table if not exists parquet_table(id int, label boolean, name string," +
        "binaryField array<binary>, autoLabel boolean) using parquet")
    sql("insert into parquet_table values(1,true,'abc',array('binary'),false)")
    checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"),
      sql("SELECT binaryField[0] FROM parquet_table"))
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
  }

  test("test struct of binary data type with sparkfileformat ") {
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
    sql("create table if not exists carbon_table(id int, label boolean, name string," +
        "binaryField struct<b:binary>, autoLabel boolean) using carbon")
    sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)")
    sql("create table if not exists parquet_table(id int, label boolean, name string," +
        "binaryField struct<b:binary>, autoLabel boolean) using parquet")
    sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)")
    checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
      sql("SELECT binaryField.b FROM parquet_table"))
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
  }

  test("test map of binary data type with sparkfileformat") {
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
    sql("create table if not exists parquet_table(id int, label boolean, name string," +
        "binaryField map<int, binary>, autoLabel boolean) using parquet")
    sql("insert into parquet_table values(1,true,'abc',map(1,'binary'),false)")
    sql("create table if not exists carbon_table(id int, label boolean, name string," +
        "binaryField map<int, binary>, autoLabel boolean) using carbon")
    sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)")
    checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"),
      sql("SELECT binaryField[1] FROM parquet_table"))
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
  }

  test("test map of array and struct binary data type with sparkfileformat") {
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
    sql("create table if not exists parquet_table(id int, label boolean, name string," +
        "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " +
        "using parquet")
    sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1," +
        "named_struct('b','binary')))")
    sql("create table if not exists carbon_table(id int, label boolean, name string," +
        "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " +
        "using carbon")
    sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1," +
        "named_struct('b','binary')))")
    checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"),
      sql("SELECT binaryField1[1][1] FROM parquet_table"))
    checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
      sql("SELECT binaryField2[1].b FROM parquet_table"))
    sql("drop table if exists carbon_table")
  }

  test("test of array of struct and struct of array of binary data type with sparkfileformat") {
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
    sql("create table if not exists parquet_table(id int, label boolean, name string," +
        "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " +
        "using parquet")
    sql("insert into parquet_table values(1,true,'abc',array(named_struct('b1','binary'))," +
        "named_struct('b2',array('binary')))")
    sql("create table if not exists carbon_table(id int, label boolean, name string," +
        "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " +
        "using carbon")
    sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary'))," +
        "named_struct('b2',array('binary')))")
    checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"),
      sql("SELECT  binaryField1[1].b1 FROM parquet_table"))
    checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"),
      sql("SELECT binaryField2.b2[0] FROM parquet_table"))
    sql("drop table if exists carbon_table")
    sql("drop table if exists parquet_table")
  }

}
